package cn.doitedu.api;

import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.runtime.operators.window.state.WindowState;
import org.apache.flink.util.Collector;


/**
 * @Author: 深似海
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2024/2/28
 * @Desc: 学大数据，上多易教育
 *
 *   滚动时间窗口演示
 **/
public class _23_GlobalWindow_Time_Tumbling_Demo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStateBackend(new HashMapStateBackend());
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

        DataStreamSource<String> stream = env.socketTextStream("doitedu", 8899);

        stream.map(Integer::parseInt)
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))  // 传入一个窗口分配器：滚动处理时间窗口分配器
                .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
                    @Override
                    public void process(ProcessAllWindowFunction<Integer, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {

                        TimeWindow window = context.window();
                        long windowStart = window.getStart();
                        long windowEnd = window.getEnd();
                        System.out.println("当前窗口是: [" + windowStart + "," + windowEnd + ")");

                        int sum = 0;
                        for (Integer element : elements) {
                            sum += element;
                        }

                        out.collect(sum);
                    }
                }).print();

        env.execute();


    }
}
